Published on

Apache Doris:高性能分布式分析型数据库技术解析

Authors
  • avatar
    Name
    Liant
    Twitter

分布式数仓产品-Apache Doris

官方文档

  • 基于MPP(大规模并行处理)架构的分析型数据库
  • 性能卓越,PB级别数据毫秒/秒级响应
  • 适用于高并发、低延时下的多维分析、实时报表等场景
  • 由百度自研,2017年开源,2018年贡献给Apache社区后更名为Apache Doris
  • 百度内部统称其为“数据仓库Palo”,百度智能云上提供Palo的企业级托管版本

Doris在数据流中的定位

Doris在数据流中的定位

Doris的多种导数方式

Doris中有Routine Load、Broker Load和Stream Load等丰富内置的导数方式:

  • 支持简单过滤和转换函数
  • 可以 容忍少量的数据异常
  • 支持 ACID 和导数幂等性 。
  1. Routine Load:当前只支持消费Kafka的实时数据,按每批条数、导入间隔和并发数等设置导数参数。

  2. Broker Load :Doris 集群中一种可选进程,主要用于支持 Doris 读写远端存储上的文件和目录。

    支持以下远端存储:

    • Apache HDFS
    • 阿里云 OSS
    • 腾讯云 CHDFS
    • 腾讯云 GFS (1.2.0 版本支持)
    • 华为云 OBS (1.2.0 版本后支持)
    • 亚马逊 S3
    • JuiceFS (2.0.0 版本支持)
  3. Stream Load :

Stream load 主要适用于导入本地文件,或通过程序导入数据流中的数据。

Stream load 是一个同步的导入方式,用户通过发送 HTTP 协议发送请求将本地文件或数据流导入到 Doris 中,同步执行导入并返回导入结果,用户可直接通过请求的返回体判断本次导入是否成功。

3 Doris发展历程

Doris发展历程

Doris总体框架

Doris总体框架

Google Mesa(数据模型)

Mesa满足一系列复杂且具有挑战性的用户和系统需求,包括接近实时的数据提取和查询能力,以及针对大数据和查询量的高可用性,可靠性,容错性和可伸缩性。但是Mesa本身不提供SQL查询引擎所以借鉴了下面。

Apache Impala (MPP Query Engine)

Impala是一个非常好的MPP SQL查询引擎,做更多的查询优化,在速度上做到了很好。但是缺少比较完美的分布式存储引擎,所以需要集成下面。

MPP (Massively Parallel Processing),即大规模并行处理。简单来说,MPP是将任务并行的分散到多个服务器和节点上,在每个节点上计算完成后,将各自部分的结果汇总在一起得到最终的结果(与Hadoop相似)

注:MPPDB与Hadoop都是将运算分布到节点中独立运算后进行结果合并(分布式计算),但由于依据的理论和采用的技术路线不同而有各自的优缺点和适用范围。

我们现在大数据存储与处理趋势:MPP DB+Hadoop混搭使用,用MPP处理PB级别的、高质量的结构化数据,同时为应用提供丰富的SQL和事物支持能力;用Hadoop实现半结构化、非结构化数据处理。

这样可以同时满足结构化、半结构化和非结构化数据的高效处理需求。

Apache ORCFile (存储格式,编码和压缩)

只访问查询涉及的列,能大量降低系统I/O;列数据相对来说比较类似,压缩比更高;每一列由一个线索来处理,更有利于查询的并发处理。

Doris内部架构

Doris内部架构

Doris的系统架构组成主要有:

  • BackEnd(后端节点),简称BE。
  • FrontEnd(前端节点),简称FE。
  • bdbje(BerkekeyDB Java Edition),负责元数据操作日志的持久化、FE 高可用等功能。

FE

主要负责查询的编译,分发和元数据管理。

FE 包含的三种角色的理解

  • leader跟follower,主要是用来达到元数据的高可用,保证单节点宕机的情况下,元数据能够实时地在线恢复,而不影响整个服务。

  • observer只是用来扩展查询节点,就是说如果在发现集群压力非常大的情况下,需要去扩展整个查询的能力,那么可以加observer的节点。observer不参与任何的写入,只参与读取。

    • 管理元数据(库, 表, 分区, tablet副本等信息), 执行SQL语句命令。
    • FE高可用部署, 使用复制协议选主和主从同步元数据, 所有的元数据修改操作,由FE Leader节点完成, FE Follower节点可执行读操作。元数据的读写满足顺序一致性。 FE的节点数目采用2n+1, 可容忍n个节点故障。当FE Leader故障时, 从现有的Follower节点重新选主, 完成故障切换。Observer节点仅从 Leader节点进行元数据同步,不参与选举。能够横向扩展以提供元数据的读服务的扩展性。
    • FE的SQL layer对用户提交的SQL进行解析, 分析, 语义分析和关系代数优化, 生产逻辑执行计划。
    • FE的Planner负责把逻辑计划转化为可分布式执行的物理计划, 分发给一组BE。
    • FE监督,管理BE的上下线, 根据BE的健康状态和存活数, 维持tablet副本的数量。
    • FE协调数据导入, 保证数据导入的一致性。

BE

主要负责数据的存储、以及查询计划的执行

  1. BE管理tablet副本, tablet是table经过分区分桶形成的子表, 采用列式存储。
  2. BE受驱动FE, 创建或删除子表。
  3. BE接收FE分发的物理执行计划并指定BE coordinator节点, 在BE coordinator的调度下, 与其他BE worker共同协作完成执行。
  4. BE读取本地的列存储引擎获取数据,并通过索引和谓词下沉快速过滤数据。
  5. BE后台执行compact任务, 减少查询时的读放大。
  6. 数据导入时, 由FE指定BE coordinator,将数据以fanout的形式写入到tablet多副本所在的BE上。

Doris核心性能

Doris核心性能

Doris数据分区

Doris数据分区

Doris多副本存储、自动数据迁移、副本均衡

Doris多副本存储、自动数据迁移、副本均衡

Doris数据分片

Doris数据分片

Doris存储引擎

Doris存储引擎

Doris数据模型简介

在Doris中,数据以表(Table)的形式进行逻辑上的描述。一张表包括行(Row)和列(Column)。Row即用户的一行数据,Column用于描述一行数据中不同的字段。Column可以分为两大类:Key(维度列)和value(指标列)。

Doris的数据模型主要分为3类:

  • Aggregate模型(聚合模型)
  • Uniq模型(唯一模型)
  • Duplicate 模型(冗余模型)
数据模型在建表时已经确定,且无法修改。所以,选择一个合适的数据模型非常重要。 Doris的数据模型

常用命令

查看FE的运行状态
show frontends\G;

查看BE的运行状态
show backends\G;

添加follow角色
ALTER SYSTEM ADD FOLLOWER "follower_host:edit_log_port";

添加observer角色
ALTER SYSTEM ADD OBSERVER "observer_host:edit_log_port";

查看集群中每张表的存储占用状态
show data;

查看单张表在集群的存储分布情况
ADMIN SHOW REPLICA DISTRIBUTION FROM db_name.table_name;


显示物化视图
show alter table materialized view from example_db where TableName='table_name';
show alter table rollup;
show alter table materialized view;

确认物化视图(或rollup)已经生效
desc dns_logs_from_kafka all;

删除rollup:
ALTER TABLE db_name.table_name DROP ROLLUP rollup_name;

删除物化视图:
drop materialized view view_name on table_name;

撤销正在创建的rollup(此时数据还在写入rollup)CANCEL ALTER TABLE ROLLUP FROM db_name.table_name;

查看某张表的索引情况(创建成功之后)show index from table_name;

查看正在创建的(所有表)索引(或修改的字段)SHOW ALTER TABLE COLUMN\G;

查看具体某张表的索引(或正在修改的字段)创建情况:
SHOW ALTER TABLE COLUMN WHERE TableName = "table_name"

取消正在创建的索引(或正在修改的字段)CANCEL ALTER TABLE COLUMN FROM table_name;

注意项

  • 创建表时 Key 列必须在所有 Value 列之前。

  • 默认情况下,表名大小写敏感,需要在安装的时候就设置好,后期不可更改(lower_case_table_names)

  • ver1.2之后,修改表名需要开启 light_schema_change=true ,是个重操作(相当于新建表,重新导入数据)。

  • 单个节点内存查询时限制默认是2GB。 show variables like "%mem_limit%";

  • 查询超时默认300s。 show variables like "%query_timeout%";

  • order by 语句,如果未加 limit,系统当前默认会自动为你添加limit 65535。

  • 合理使用 Rollup 功能,可以提高查询速度。

  • string/text类型默认长度为1048576字节(1M),修改配置项 string_type_length_soft_limit_bytes=1048576

动态修改配置: curl -X POST http://{be_ip}:{be_http_port}/api/update_config?streaming_load_max_mb=1024

Doris索引介绍

索引用于帮助快速过滤或查找数据。

目前 Doris 主要支持两类索引:

  • 内建的智能索引,包括前缀索引和 ZoneMap 索引。
  • 用户手动创建的二级索引,包括 倒排索引、 bloomfilter索引 和 ngram bloomfilter索引 。

其中 ZoneMap 索引是在列存格式上,对每一列自动维护的索引信息,包括 Min/Max,Null 值个数等等。这种索引对用户透明。

数据测试(Doris测试表)

CREATE DATABASE crm;

SHOW chatset;
SHOW COLLATION;

DROP TABLE IF EXISTS `clue_labels`;
CREATE TABLE `clue_labels`
(
    `id`       BIGINT NOT NULL COMMENT "id",
    `clue_sn` largeint NOT NULL COMMENT '线索编码',
    `label_id` INT    NOT NULL COMMENT '标签id'
) UNIQUE KEY(`id`)
  DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
    "replication_allocation" = "tag.location.default: 1",
    "enable_unique_key_merge_on_write" = "true"
);

测试环境同步测试

测试环境配置:8核处理器,同一台服务器部署Doris FE和BE 内存限制2G

方案一: 使用DataLink同步所有数据

  • 测试环境配置一次最多同步两个表,否则会导致doris挂掉,无法同步数据,而且同步速度极慢平均每秒20条
  • 测试环境数据量过大长时间同步,会FE节点会挂掉

方案二: 使用脚本批量插入数据到doris 在使用DataLink监听表变化

  • 测试环境配置由于BE负责数据的存储,长时间插入数据会导致BE节点资源不足而挂掉
  • 测试排查出原因由于表字段太多,插入速度过快导致BE内存上去而资源不足挂掉,其他表字段少,未出现BE节点挂掉
  • 解决方案:更改脚本 把需要作为筛选条件和聚合的字段同步至doris, 每条sql 100条数据,每插入1万数据 等待几秒,平均每秒500条

同步数据中遇到的编码问题

由于Doris只支持utf8 ,会导致mysql表中编码utf8mb4的一个数据无法导进doris

例如字段varchar(255)所表示的单位是字符,而一个汉字一个字母都是一字符。所以这里可以存储255个汉字或者255个字母。

utf8下,1字符=3字节。(uft-8也称之为utf-8mb3)

utf8mb4下,1字符=4字节。

存储上限

varchar的存储上限是65535字节

utf-8格式: varchar(21845)是上限(65535/3)。

utf-8mb4: varchar(16383)是上限(65535/4)

例如:

表情☺️

一个表情是占用4个字节,所以utf-8下,表情会乱码,1字符装不下,需要额外的空间。

utf8mb4下,一个表情正好是一字符,能够完美显示。

varchar(255)即表示能存放255个汉字,或255个字母,或255个表情。

查询测试

测试重复线索统计Doris接口

测试重复线索统计Mysql接口

相同查询参数,都是160万以上的数据

{
    "label_ids":["7"],
    "query_field":"phone",
    "line_ids":["3"],
    "replay_line_ids":["3","4","9","10"],
    "query_label_ids":["7"],
    "created_at":["1696089600","1703951999"],
    "replay_created_at":["1696089600","1703951999"],
    "page":1,
    "per_page":10
}
  • mysql接口查询: 2.62s
  • doris接口查询: 0.95s

以下是新上线统计:统计渠道重复手机号来访的MYSQL查询,可直接在Doris使用

SELECT
    `cl`.`id`
  , `cl`.`clue_sn`
  , `cl`.`qq`
  , `cl`.`wechat`
  , `cl`.`phone`
  , `cl`.`deal_start_at`
  , `cl`.`responsible_number`
  , `cl`.`created_at`
  , `cl`.`created_number`
  , `cl`.`signed_at`
  , (
        SELECT
            COUNT(*) AS `total`
        FROM
            `clue_list` `cl2`
        WHERE
              (`cl2`.`data_status` = 1)
          AND (EXISTS (
                          SELECT
                              *
                          FROM
                              `clue_labels` `cll2`
                          WHERE
                                (cll2.`clue_sn` = cl2.`clue_sn`)
                            AND (`cll2`.`label_id` = '33')
                      ))
          AND (`cl2`.`created_at` BETWEEN '1696089600' AND '1703951999')
          AND (EXISTS (
                          SELECT *
                          FROM `clue_order` `clo`
                          WHERE cl2.id = clo.clue_id
                      ))
          AND (cl2.phone = cl.phone AND cl2.phone <> '')
    ) AS `relay_total`
FROM
    `clue_list` `cl`
WHERE
      (`cl`.`data_status` = 1)
  AND (EXISTS (
                  SELECT
                      *
                  FROM
                      `clue_labels` `cll`
                  WHERE
                        (cll.`clue_sn` = cl.`clue_sn`)
                    AND (`cll`.`label_id` = '23')
              ))
  AND (`cl`.`business` = '2-8-1')
  AND (`cl`.`created_at` BETWEEN '1696089600' AND '1703951999')
ORDER BY
    `id` DESC
LIMIT 10 OFFSET 0;

以上是查询列表数据 其中select 里有子查询 与mysql相比需要注意

mysql count(*)没有数据会返回0

doris count(*)没有数据会返回null

例如一下sql :

select id, (select count(*) as total from B where A.id=B.a_id) as total from A

SELECT
    COUNT(DISTINCT cl2.id)                                    AS `replay_total`
  , COUNT(DISTINCT IF(clo.clue_id IS NOT NULL, cl2.id, NULL)) AS `order_clue_total`
  , COUNT(DISTINCT IF(cl2.stage_status = 3, cl2.id, NULL))    AS `sign_clue_total`
FROM
    `clue_list` `cl2`
    LEFT JOIN `clue_order` `clo`
    ON clo.clue_id = cl2.id
WHERE
      (`cl2`.`data_status` = 1)
  AND (EXISTS(
                 SELECT *
                 FROM `clue_labels` `cll2`
                 WHERE (cll2.`clue_sn` = cl2.`clue_sn`) AND (`cll2`.`label_id` = '23')
             ))
  AND (`cl2`.`business` = '1-1-7')
  AND (`cl2`.`created_at` BETWEEN '1704038400' AND '1710345599')
  AND (`cl2`.`wechat` IN (
                             SELECT
                                 `cl`.`wechat`
                             FROM
                                 `clue_list` `cl`
                             WHERE
                                   (`cl`.`data_status` = 1)
                               AND (EXISTS(
                                              SELECT
                                                  *
                                              FROM
                                                  `clue_labels` `cll`
                                              WHERE
                                                    (cll.`clue_sn` = cl.`clue_sn`)
                                                AND (`cll`.`label_id` = '48')
                                          ))
                               AND (`cl`.`business` = '1-1-7')
                               AND (`cl`.`created_at` BETWEEN '1704038400' AND '1710345599')
                               AND (`cl`.`wechat` <> '')
                         ))